Skip to content

Conversation

@Dref360
Copy link
Contributor

@Dref360 Dref360 commented Oct 29, 2018

Summary

This PR solves some issue when a worker dies and the Pool is not told. (OOM, pkill, etc)

This puts a timeout on the future and notifies the user of the issue.

Discussion

What should we do for those samples in the case of Sequence? Should we re-queue the task or compute the sample directly or just drop it?

Related Issues

PR Overview

  • This PR requires new unit tests [y/n] (make sure tests are included)
  • This PR requires to update the documentation [y/n] (make sure the docs are up-to-date)
  • This PR is backwards compatible [y/n]
  • This PR changes the current API [y/n] (all API changes need to be approved by fchollet)

@gabrieldemarmiesse
Copy link
Contributor

gabrieldemarmiesse commented Oct 30, 2018

If I understand correctly (I'm by no means an expert in multithreading/multiprocessing), in the implementation proposed in this PR, the batch is dropped and a warning is raised. Is that right?

If it's not too much trouble to implement, I believe the best option is to recompute the sample and raise a warning for those reasons:

  • We don't take the risk of breaking things since other keras components might crash/behave in a weird way if the Sequence doesn't yield the right number of batches.

  • We keep the current behavior. It's also the expected behavior by the users.

  • Changing the order of the batches can cause users to wonder what is going on (silent unexpected behavior) if they overlook the warning, but this is unlikely to happen in practice.

  • If the crash is caused by a sample and is deterministic, the user is interested in the stacktrace. If we recompute it in the main process/thread, the stacktrace will be displayed.

  • On the cons side of things, we'll suffer from a performance penalty.

I might be wrong on certain points, as I don't know much about all this.

future.idx = i
self.queue.put(
executor.apply_async(get_index, (self.uid, i)), block=True)
future, block=True)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think the line needs to be broken.

"An input could not be retrieved."
" It could be because a worker has died."
"We do not have any information on the lost sample."
.format(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't believe the format is useful.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, maybe you intended to display the index in the warning?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, but we cannot know the index in a generator.

Copy link
Collaborator

@fchollet fchollet left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am curious as to what's the interaction with fit_generator when batches are dropped? Does while steps_done < steps_per_epoch: just complete fine? Are other batches drawn at random instead of the dropped ones?

except mp.TimeoutError:
idx = future.idx
warnings.warn(
"The input {} could not be retrieved."
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: use ' as quote character for consistency.

Same below.

"An input could not be retrieved."
" It could be because a worker has died."
"We do not have any information on the lost sample."
.format(),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, maybe you intended to display the index in the warning?

@Dref360
Copy link
Contributor Author

Dref360 commented Nov 19, 2018

As of now, there is no way to inform fit_generator to skip an index. So every step will be offset.

Quick note that the "real" fix for travis is to use spawn instead of fork. The issue is that it's only doable with Python3.

@fchollet
Copy link
Collaborator

Quick note that the "real" fix for travis is to use spawn instead of fork. The issue is that it's only doable with Python3.

I think it would be acceptable to only test this functionality with Python 3. Our #1 priority should be to make CI reliable and fast.

@gabrieldemarmiesse
Copy link
Contributor

Currently, most of the timouts append with CNTK and python 3.6. So fixing the timeout for 3.6 would be a very good first step.

@Dref360
Copy link
Contributor Author

Dref360 commented Nov 27, 2018

Now multiprocessing tests on Sequences are running on all backends.
Also, the index is fetched after the timeout. I think the user would expect this behaviour.

We do see some Github issues where the training stops. (When using HDF5 and/or OOM)
We could force the OrderedEnqueuer to use spawn when available, it's will solve some of these issues (the OOM ones).

Copy link
Contributor

@gabrieldemarmiesse gabrieldemarmiesse left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few improvements on the tests are possible.

Since I don't know much about multiprocessing, a second review (maybe from @fchollet ) would be beneficial.

for _ in range(11):
next(gen_output)
assert "The input {} could not be retrieved.".format(
missing_idx) in str(w[-1].message)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test will fail if other warnings are being emitted. I would recommend using pytest's utility to check that warnings are being triggered correctly: https://docs.pytest.org/en/latest/warnings.html#warns

warnings.simplefilter("always")
for _ in range(4 * missing_idx):
next(gen_output)
assert 'An input could not be retrieved.' in str(w[-1].message)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here.

@gabrieldemarmiesse
Copy link
Contributor

@Dref360 I updated the PR.

@fchollet could you review? Thank you.

@gabrieldemarmiesse
Copy link
Contributor

Ping @fchollet could you review when you have the time? Thank you.

Copy link
Collaborator

@fchollet fchollet left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM as far as I can tell. Shall we merge it?

@Dref360
Copy link
Contributor Author

Dref360 commented Dec 17, 2018

I think this is LGTM, this would give the user "some" information and we can move forward with using "spawn" when possible on Travis.

@gabrieldemarmiesse
Copy link
Contributor

LGTM.

@gabrieldemarmiesse gabrieldemarmiesse merged commit ca802e1 into keras-team:master Dec 17, 2018
@gabrieldemarmiesse
Copy link
Contributor

Thanks @Dref360 for working on this!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants